/****************************************************
 * 创建人：@author fengxin    
 * 创建时间: 2019/12/19/9:26
 * 项目名称: risk
 * 文件名称: StreamExample.java
 * 文件描述: 
 *
 * All rights Reserved, Designed By 投资交易团队
 * @Copyright:2016-2019
 *
 ********************************************************/
package com.learn.zmq.stream;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZThread;
import org.zeromq.ZThread.IAttachedRunnable;

import java.util.HashSet;
import java.util.Set;

/**
 * SetSocketOpt中参数含义可参考{@see <link href=https://www.cnblogs.com/fengbohello/p/4398953.html />}
 * 包名称：com.ysstech.zmq.stream
 * 类名称：StreamExample
 * 类描述：SocketType.STREAM 用例
 * 创建人：@author fengxin
 * 创建时间：2019/12/19/9:26
 */
public class StreamExample {

    public static final byte[] standardGreeting = new byte[64];

    public static final byte[] readyCommandForRouter = { 4, 41, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', 't', '-', 'T', 'y', 'p',
            'e', 0, 0, 0, 6, 'R', 'O', 'U', 'T', 'E', 'R', 8, 'I', 'd', 'e', 'n', 't', 'i', 't', 'y', 0, 0,
            0, 0 };

    public static final byte[] readyCommandForDealer = { 3, 0, 'N','U','L','L',4,51,5, 'R', 'E', 'A', 'D', 'Y', 13, 'S', 'o', 'c', 'k', 'e', 't', '-', 'T', 'y', 'p',
            'e', 0, 0, 0, 6, 'D', 'E', 'A', 'L', 'E', 'R', 10, 'I', 'd', 'e', 'n', 't', 'i', 't', 'y', 0, 0,
            0, 0 };

    static {
        standardGreeting[0] = (byte) 0xff;
        standardGreeting[8] = 1;
        standardGreeting[9] = 0x7f;
        standardGreeting[10] = 3;
        standardGreeting[12] = 'N';
        standardGreeting[13] = 'U';
        standardGreeting[14] = 'L';
        standardGreeting[15] = 'L';
    }




    private static class StreamServer implements IAttachedRunnable {

        public String host;

        @Override
        public void run(Object[] args, ZContext ctx, ZMQ.Socket pipe) {
            ZMQ.Socket streamer = ctx.createSocket(SocketType.STREAM);

            /**
             * setLinger可设置Socket关闭前将发送还未发送消息停留时间
             * 设置为0时 所有未发送的消息都会丢弃
             * 参考{@link  <link href=https://www.cnblogs.com/fengbohello/p/4398953.html />}
             */
            streamer.setLinger(-1);
            System.out.println("StreamServer Linger: " + streamer.getLinger());

            // 绑定到端口
            streamer.bind("tcp://*:12306");



            // 获取实际的server地址
            host = streamer.getLastEndpoint();

            // 服务器地址
            System.out.println("server address is: " + host);

            // 服务器id
            byte[] serverId = streamer.getIdentity();
            byte[] peerId;
            int count = 0;
            String peerIdStr;
            Set<String> peerIdSet = new HashSet<>();

            // 接收连接消息
            while (true) {
                // 第一帧一定是 id
                peerId = streamer.recv(0);
                peerIdStr = new String(peerId,ZMQ.CHARSET);
                // 第二帧 以后的所有帧
                while(streamer.hasReceiveMore()) {
                    byte[] mainFrame =  streamer.recv(0);
                    String msg = new String(mainFrame,ZMQ.CHARSET);
                    if (msg.contains("DEALE")) {
                        // 如果是dealer发过来的ReadyCommand
                        streamer.send(peerId,ZMQ.SNDMORE);
                        streamer.send(readyCommandForRouter,0);
                    }
                    System.out.println(msg);
                }
                count++;
                if (!peerIdSet.contains(peerIdStr)) {
                    // 新id添加到id集合中 并发送greeting
                    peerIdSet.add(peerIdStr);
                    streamer.send(peerId,ZMQ.SNDMORE);
                    streamer.send(standardGreeting,0);
                }
            }
        }
    }

    private static class StreamClinet implements IAttachedRunnable {
        @Override
        public void run(Object[] args, ZContext ctx, ZMQ.Socket pipe) {
            ZMQ.Socket streamer = ctx.createSocket(SocketType.STREAM);
            streamer.setLinger(0);
            streamer.connect("tcp://localhost:12306");

            // 接收连接消息

            // 第一帧一定是 id
            byte[] peerId = streamer.recv(0);
            byte[] zero =  new byte[0];
            String peerIdStr = new String(peerId,ZMQ.CHARSET);
            // 第二帧 以后的所有帧
            while(streamer.hasReceiveMore()) {
                byte[] mainFrame =  streamer.recv(0);
                System.out.println("connect frame size:"+mainFrame.length);
            }

            // 接收greeting
            // 第一帧一定是 id
            peerId = streamer.recv(0);
            // 第二帧 以后的所有帧
            while(streamer.hasReceiveMore()) {
                byte[] mainFrame =  streamer.recv(0);
                System.out.println("greeting frame size:"+mainFrame.length);
            }

            // 开始循环发送消息
            int count = 0;
            while (true) {
                count++;
                streamer.send(peerId,ZMQ.SNDMORE);
                streamer.send(("stream client hello " + count).getBytes(ZMQ.CHARSET), 0);
                ZMQ.sleep(2);
            }
        }
    }


    private static class DealerClinet implements IAttachedRunnable {
        @Override
        public void run(Object[] args, ZContext ctx, ZMQ.Socket pipe) {
            ZMQ.Socket dealer = ctx.createSocket(SocketType.DEALER);
            System.out.println("DealerClinet Linger: " + dealer.getLinger());
            dealer.connect("tcp://localhost:12306");
            int count = 0;
            while (true) {
                dealer.send(("dealer hello "+ count).getBytes(ZMQ.CHARSET),0);
                count++;
                ZMQ.sleep(2);
            }

        }
    }

    public static void testStream2Stream() {
        try (ZContext ctx = new ZContext()) {

            ZThread.fork(ctx, new StreamServer());

            ZThread.fork(ctx, new StreamClinet());


            // 阻塞 否则会直接退出
            synchronized (ctx) {
                try {
                    ctx.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    public static void testStream2Dealer() {
        try (ZContext ctx = new ZContext()) {
            ZThread.fork(ctx, new StreamServer());

            ZThread.fork(ctx, new DealerClinet());

            // 阻塞 否则会直接退出
            synchronized (ctx) {
                try {
                    ctx.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] argv) {
//        testStream2Dealer();

        testStream2Stream();
    }
}


